package com.my;

import com.my.conf.MyRedisCluster;
import com.my.mapper.MyRedisMapper;
import com.my.mapper.MyRedisMapperHash;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;

public class App {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream("hadoop102",7777);

        stream.addSink(new RedisSink<String>(MyRedisCluster.getConf(), MyRedisMapperHash.getMapper()));

        env.execute("scoket_to_redis");
    }

}
